package one.mixin.android.websocket

import android.annotation.SuppressLint
import com.google.firebase.crashlytics.FirebaseCrashlytics
import com.google.gson.Gson
import io.reactivex.Observable
import io.reactivex.disposables.Disposable
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.launch
import okhttp3.OkHttpClient
import okhttp3.Request
import okhttp3.Response
import okhttp3.WebSocket
import okhttp3.WebSocketListener
import okio.ByteString
import one.mixin.android.Constants.API.Mixin_WS_URL
import one.mixin.android.Constants.API.WS_URL
import one.mixin.android.MixinApplication
import one.mixin.android.api.ClientErrorException
import one.mixin.android.api.service.AccountService
import one.mixin.android.db.MixinDatabase
import one.mixin.android.db.OffsetDao
import one.mixin.android.db.insertNoReplace
import one.mixin.android.db.makeMessageStatus
import one.mixin.android.db.pending.PendingDatabase
import one.mixin.android.di.isNeedSwitch
import one.mixin.android.extension.gzip
import one.mixin.android.extension.networkConnected
import one.mixin.android.extension.runOnUiThread
import one.mixin.android.extension.ungzip
import one.mixin.android.job.DecryptCallMessage.Companion.listPendingOfferHandled
import one.mixin.android.job.MixinJobManager
import one.mixin.android.job.RefreshOffsetJob
import one.mixin.android.session.Session
import one.mixin.android.util.ErrorHandler.Companion.AUTHENTICATION
import one.mixin.android.util.FLOOD_THREAD
import one.mixin.android.util.GzipException
import one.mixin.android.util.SINGLE_DB_THREAD
import one.mixin.android.util.reportException
import one.mixin.android.vo.FloodMessage
import one.mixin.android.vo.LinkState
import one.mixin.android.vo.MessageStatus
import one.mixin.android.vo.Offset
import one.mixin.android.vo.STATUS_OFFSET
import one.mixin.android.vo.createAckJob
import timber.log.Timber
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit

class ChatWebSocket(
    private var applicationScope: CoroutineScope,
    private val okHttpClient: OkHttpClient,
    private val accountService: AccountService,
    private val mixinDatabase: MixinDatabase,
    private val pendingDatabase: PendingDatabase,
    private val jobManager: MixinJobManager,
    private val linkState: LinkState,
) : WebSocketListener() {
    private val offsetDao: OffsetDao = mixinDatabase.offsetDao()

    private val failCode = 1000
    private val quitCode = 1001
    var connected: Boolean = false
    private var client: WebSocket? = null
    private val transactions = ConcurrentHashMap<String, WebSocketTransaction>()
    private val gson = Gson()
    private val accountId = Session.getAccountId()
    private var homeUrl = Mixin_WS_URL

    companion object {
        val TAG = ChatWebSocket::class.java.simpleName
    }

    init {
        connected = false
    }

    private var hostFlag = false

    @Synchronized
    fun connect() {
        if (client == null) {
            connected = false
            homeUrl =
                if (hostFlag) {
                    Mixin_WS_URL
                } else {
                    WS_URL
                }
            client = okHttpClient.newWebSocket(Request.Builder().url(homeUrl).build(), this)
        }
    }

    @Synchronized
    fun disconnect() {
        if (client != null) {
            closeInternal(quitCode)
            transactions.clear()
            connectTimer?.dispose()
            client = null
            connected = false
        }
    }

    @Synchronized
    fun sendMessage(
        blazeMessage: BlazeMessage,
        timeout: Long = 5,
    ): BlazeMessage? {
        var bm: BlazeMessage? = null
        val latch = CountDownLatch(1)
        val transaction =
            WebSocketTransaction(
                blazeMessage.id,
                object : TransactionCallbackSuccess {
                    override fun success(data: BlazeMessage) {
                        bm = data
                        latch.countDown()
                    }
                },
                object : TransactionCallbackError {
                    override fun error(data: BlazeMessage?) {
                        bm = data
                        latch.countDown()
                    }
                },
            )
        if (client != null && connected) {
            transactions[blazeMessage.id] = transaction
            val result = client!!.send(gson.toJson(blazeMessage).gzip())
            if (result) {
                latch.await(timeout, TimeUnit.SECONDS)
            }
        } else {
            Timber.tag(TAG).e("WebSocket not connect")
        }
        return bm
    }

    private fun sendPendingMessage() {
        val blazeMessage = createListPendingMessage(pendingDatabase.getLastBlazeMessageCreatedAt())
        val transaction =
            WebSocketTransaction(
                blazeMessage.id,
                object : TransactionCallbackSuccess {
                    override fun success(data: BlazeMessage) {
                        listPendingOfferHandled = false
                    }
                },
                object : TransactionCallbackError {
                    override fun error(data: BlazeMessage?) {
                        sendPendingMessage()
                    }
                },
            )
        transactions[blazeMessage.id] = transaction
        client?.send(gson.toJson(blazeMessage).gzip())
    }

    override fun onOpen(
        webSocket: WebSocket,
        response: Response,
    ) {
        if (client != null) {
            connected = true
            client = webSocket
            webSocketObserver?.onSocketOpen()
            MixinApplication.appContext.runOnUiThread {
                linkState.state = LinkState.ONLINE
            }
            connectTimer?.dispose()
            jobManager.start()
            jobManager.addJobInBackground(RefreshOffsetJob())
            sendPendingMessage()
        }
    }

    override fun onMessage(
        webSocket: WebSocket,
        bytes: ByteString,
    ) {
        applicationScope.launch(SINGLE_DB_THREAD) {
            try {
                val json = bytes.ungzip()
                val blazeMessage = gson.fromJson(json, BlazeMessage::class.java)
                if (blazeMessage.error == null) {
                    if (transactions[blazeMessage.id] != null) {
                        transactions[blazeMessage.id]!!.success.success(blazeMessage)
                        transactions.remove(blazeMessage.id)
                    }
                    if (blazeMessage.data != null && blazeMessage.isReceiveMessageAction()) {
                        handleReceiveMessage(blazeMessage)
                    }
                } else {
                    if (transactions[blazeMessage.id] != null) {
                        transactions[blazeMessage.id]!!.error.error(blazeMessage)
                        transactions.remove(blazeMessage.id)
                    }
                    if (blazeMessage.action == ERROR_ACTION && blazeMessage.error.code == AUTHENTICATION) {
                        try {
                            val errorDescription = "Force logout webSocket.\nblazeMessage: $blazeMessage"
                            val ise = IllegalStateException(errorDescription)
                            FirebaseCrashlytics.getInstance().log("401 $errorDescription")
                            reportException(ise)
                            val response = accountService.getMe().execute()
                            if (response.body()?.errorCode == AUTHENTICATION) {
                                connected = false
                                closeInternal(quitCode)
                            } else {
                                closeInternal(failCode)
                            }
                        } catch (e: Exception) {
                            reportException(e)
                        }
                    }
                }
            } catch (e: GzipException) {
                reportException(e)
            }
        }
    }

    @SuppressLint("CheckResult")
    @Synchronized
    override fun onClosed(
        webSocket: WebSocket,
        code: Int,
        reason: String,
    ) {
        connected = false
        if (code == failCode) {
            closeInternal(code)
            jobManager.stop()
            if (connectTimer == null || connectTimer?.isDisposed == true) {
                connectTimer =
                    Observable.interval(2000, TimeUnit.MILLISECONDS).subscribe(
                        {
                            if (MixinApplication.appContext.networkConnected() && Session.checkToken()) {
                                connect()
                            }
                        },
                        {
                        },
                    )
            }
        } else {
            webSocket.cancel()
        }
    }

    private var connectTimer: Disposable? = null

    @Synchronized
    override fun onFailure(
        webSocket: WebSocket,
        t: Throwable,
        response: Response?,
    ) {
        if (t.isNeedSwitch()) {
            hostFlag = !hostFlag
        }
        Timber.e(t, "WebSocket onFailure $homeUrl")
        if (client != null) {
            if (t is ClientErrorException && t.code == AUTHENTICATION) {
                closeInternal(quitCode)
            } else {
                onClosed(webSocket, failCode, "OK")
            }
        }
    }

    private fun closeInternal(code: Int) {
        try {
            connected = false
            if (client != null) {
                client!!.close(code, "OK")
            }
        } catch (e: Exception) {
            reportException(e)
        } finally {
            client = null
            webSocketObserver?.onSocketClose()
            MixinApplication.appContext.runOnUiThread {
                linkState.state = LinkState.OFFLINE
            }
        }
    }

    private fun handleReceiveMessage(blazeMessage: BlazeMessage) {
        val data = gson.fromJson(blazeMessage.data, BlazeMessageData::class.java)
        if (blazeMessage.action == ACKNOWLEDGE_MESSAGE_RECEIPT) {
            mixinDatabase.makeMessageStatus(data.status, data.messageId) // Ack of the server, conversationId is ""
            pendingDatabase.makeMessageStatus(data.status, data.messageId)
            val offset = offsetDao.getStatusOffset()
            if (offset == null || offset != data.updatedAt) {
                offsetDao.insert(Offset(STATUS_OFFSET, data.updatedAt))
            }
        } else if (blazeMessage.action == CREATE_MESSAGE || blazeMessage.action == CREATE_CALL || blazeMessage.action == CREATE_KRAKEN) {
            if (data.userId == accountId && data.category.isEmpty()) { // Ack of the create message
                mixinDatabase.makeMessageStatus(data.status, data.messageId)
                pendingDatabase.makeMessageStatus(data.status, data.messageId)
            } else {
                applicationScope.launch(FLOOD_THREAD) {
                    val jsonData = gson.toJson(data)
                    if (jsonData.isNullOrBlank()) {
                        reportException(IllegalArgumentException("Error flood data: ${blazeMessage.id} ${blazeMessage.action}"))
                        mixinDatabase.jobDao().insertNoReplace(createAckJob(ACKNOWLEDGE_MESSAGE_RECEIPTS, BlazeAckMessage(data.messageId, MessageStatus.DELIVERED.name)))
                        return@launch
                    }
                    pendingDatabase.insertFloodMessage(FloodMessage(data.messageId, jsonData, data.createdAt))
                }
            }
        } else {
            pendingDatabase.insertJob(createAckJob(ACKNOWLEDGE_MESSAGE_RECEIPTS, BlazeAckMessage(data.messageId, MessageStatus.READ.name)))
        }
    }

    private var webSocketObserver: WebSocketObserver? = null

    fun setWebSocketObserver(webSocketObserver: WebSocketObserver?) {
        this.webSocketObserver = webSocketObserver
    }

    interface WebSocketObserver {
        fun onSocketClose()

        fun onSocketOpen()
    }
}
